Crate datafusion

source ·
Expand description

DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format. DataFusion’s many use cases help developers build very fast and feature rich database and analytic systems, customized to particular workloads.

“Out of the box,” DataFusion quickly runs complex SQL and DataFrame queries using a sophisticated query planner, a columnar, multi-threaded, vectorized execution engine, and partitioned data sources (Parquet, CSV, JSON, and Avro).

DataFusion is designed for easy customization such as supporting additional data sources, query languages, functions, custom operators and more. See the Architecture section for more details.

Examples

The main entry point for interacting with DataFusion is the SessionContext. Exprs represent expressions such as a + b.

DataFrame

To execute a query against data stored in a CSV file using a DataFrame:


let ctx = SessionContext::new();

// create the dataframe
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan
let df = df.filter(col("a").lt_eq(col("b")))?
           .aggregate(vec![col("a")], vec![min(col("b"))])?
           .limit(0, Some(100))?;

// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;

// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
   .to_string();

let expected = vec![
    "+---+----------------+",
    "| a | MIN(?table?.b) |",
    "+---+----------------+",
    "| 1 | 2              |",
    "+---+----------------+"
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);

SQL

To execute a query against a CSV file using SQL:


let ctx = SessionContext::new();

ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;

// create a plan
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;

// execute the plan
let results: Vec<RecordBatch> = df.collect().await?;

// format the results
let pretty_results = arrow::util::pretty::pretty_format_batches(&results)?
  .to_string();

let expected = vec![
    "+---+----------------+",
    "| a | MIN(example.b) |",
    "+---+----------------+",
    "| 1 | 2              |",
    "+---+----------------+"
];

assert_eq!(pretty_results.trim().lines().collect::<Vec<_>>(), expected);

More Examples

There are many additional annotated examples of using DataFusion in the datafusion-examples directory.

Customization and Extension

DataFusion is designed to be a “disaggregated” query engine. This means that developers can mix and extend the parts of DataFusion they need for their usecase. For example, just the ExecutionPlan operators, or the SqlToRel SQL planner and optimizer.

In order to achieve this, DataFusion supports extension at many points:

You can find examples of each of them in the datafusion-examples directory.

Architecture

Overview Presentations

The following presentations offer high level overviews of the different components and how they interact together.

  • [Apr 2023]: The Apache Arrow DataFusion Architecture talks
  • [July 2022]: DataFusion and Arrow: Supercharge Your Data Analytical Tool with a Rusty Query Engine: recording and slides
  • [March 2021]: The DataFusion architecture is described in Query Engine Design and the Rust-Based DataFusion in Apache Arrow: recording (DataFusion content starts ~ 15 minutes in) and slides
  • [February 2021]: How DataFusion is used within the Ballista Project is described in Ballista: Distributed Compute with Rust and Apache Arrow: recording

Query Planning and Execution Overview

SQL

                Parsed with            SqlToRel creates
                sqlparser              initial plan
┌───────────────┐           ┌─────────┐             ┌─────────────┐
│   SELECT *    │           │Query {  │             │Project      │
│   FROM ...    │──────────▶│..       │────────────▶│  TableScan  │
│               │           │}        │             │    ...      │
└───────────────┘           └─────────┘             └─────────────┘

  SQL String                 sqlparser               LogicalPlan
                             AST nodes
  1. The query string is parsed to an Abstract Syntax Tree (AST) Statement using sqlparser.

  2. The AST is converted to a LogicalPlan and logical expressions Exprs to compute the desired result by the SqlToRel planner.

DataFrame

When executing plans using the DataFrame API, the process is identical as with SQL, except the DataFrame API builds the LogicalPlan directly using LogicalPlanBuilder. Systems that have their own custom query languages typically also build LogicalPlan directly.

Planning

            AnalyzerRules and      PhysicalPlanner          PhysicalOptimizerRules
            OptimizerRules         creates ExecutionPlan    improve performance
            rewrite plan
┌─────────────┐        ┌─────────────┐      ┌───────────────┐        ┌───────────────┐
│Project      │        │Project(x, y)│      │ProjectExec    │        │ProjectExec    │
│  TableScan  │──...──▶│  TableScan  │─────▶│  ...          │──...──▶│  ...          │
│    ...      │        │    ...      │      │    ParquetExec│        │    ParquetExec│
└─────────────┘        └─────────────┘      └───────────────┘        └───────────────┘

 LogicalPlan            LogicalPlan         ExecutionPlan             ExecutionPlan

To process large datasets with many rows as efficiently as possible, significant effort is spent planning and optimizing, in the following manner:

  1. The LogicalPlan is checked and rewritten to enforce semantic rules, such as type coercion, by AnalyzerRules

  2. The LogicalPlan is rewritten by OptimizerRules, such as projection and filter pushdown, to improve its efficiency.

  3. The LogicalPlan is converted to an ExecutionPlan by a PhysicalPlanner

  4. The ExecutionPlan is rewritten by PhysicalOptimizerRules, such as sort and join selection, to improve its efficiency.

Data Sources

Planning       │
requests       │            TableProvider::scan
information    │            creates an
such as schema │            ExecutionPlan
               │
               ▼
  ┌─────────────────────────┐         ┌──────────────┐
  │                         │         │              │
  │impl TableProvider       │────────▶│ParquetExec   │
  │                         │         │              │
  └─────────────────────────┘         └──────────────┘
        TableProvider
        (built in or user provided)    ExecutionPlan

DataFusion includes several built in data sources for common use cases, and can be extended by implementing the TableProvider trait. A TableProvider provides information for planning and an ExecutionPlans for execution.

  1. ListingTable: Reads data from Parquet, JSON, CSV, or AVRO files. Supports single files or multiple files with HIVE style partitioning, optional compression, directly reading from remote object store and more.

  2. MemTable: Reads data from in memory RecordBatches.

  3. StreamingTable: Reads data from potentially unbounded inputs.

Plans

Logical planning yields LogicalPlans nodes and Expr expressions which are Schema aware and represent statements independent of how they are physically executed. A LogicalPlan is a Directed Acyclic Graph (DAG) of other LogicalPlans, each potentially containing embedded Exprs.

An ExecutionPlan (sometimes referred to as a “physical plan”) is a plan that can be executed against data. It a DAG of other ExecutionPlans each potentially containing expressions of the following types:

  1. PhysicalExpr: Scalar functions

  2. AggregateExpr: Aggregate functions

  3. WindowExpr: Window functions

Compared to a LogicalPlan, an ExecutionPlan has concrete information about how to perform calculations (e.g. hash vs merge join), and how data flows during execution (e.g. partitioning and sortedness).

Execution

           ExecutionPlan::execute             Calling next() on the
           produces a stream                  stream produces the data

┌───────────────┐      ┌─────────────────────────┐         ┌────────────┐
│ProjectExec    │      │impl                     │    ┌───▶│RecordBatch │
│  ...          │─────▶│SendableRecordBatchStream│────┤    └────────────┘
│    ParquetExec│      │                         │    │    ┌────────────┐
└───────────────┘      └─────────────────────────┘    ├───▶│RecordBatch │
              ▲                                       │    └────────────┘
ExecutionPlan │                                       │         ...
              │                                       │
              │                                       │    ┌────────────┐
            PhysicalOptimizerRules                    ├───▶│RecordBatch │
            request information                       │    └────────────┘
            such as partitioning                      │    ┌ ─ ─ ─ ─ ─ ─
                                                      └───▶ None        │
                                                           └ ─ ─ ─ ─ ─ ─

ExecutionPlans process data using the Apache Arrow memory format, making heavy use of functions from the arrow crate. Calling execute produces 1 or more partitions of data, consisting an operator that implements SendableRecordBatchStream.

Values are represented with ColumnarValue, which are either ScalarValue (single constant values) or ArrayRef (Arrow Arrays).

Balanced parallelism is achieved using RepartitionExec, which implements a Volcano style “Exchange”.

See the implementors of ExecutionPlan for a list of physical operators available.

State Management and Configuration

ConfigOptions contain options to control DataFusion’s execution.

The state required to execute queries is managed by the following structures:

  1. SessionContext: State needed for create LogicalPlans such as the table definitions, and the function registries.

  2. TaskContext: State needed for execution such as the MemoryPool, DiskManager, and ObjectStoreRegistry.

  3. ExecutionProps: Per-execution properties and data (such as starting timestamps, etc).

Resource Management

The amount of memory and temporary local disk space used by DataFusion when running a plan can be controlled using the MemoryPool and DiskManager.

Crate Organization

DataFusion is organized into multiple crates to enforce modularity and improve compilation times. The crates are:

Re-exports

Modules

Macros

  • Compares formatted output of a record batch with an expected vector of strings, with the result of pretty formatting record batches. This is a macro so errors appear on the correct line
  • Compares formatted output of a record batch with an expected vector of strings in a way that order does not matter. This is a macro so errors appear on the correct line

Constants